package idx

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"time"

	"github.com/pkg/errors"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/sourcegraph/sourcegraph/pkg/api"
	"golang.org/x/net/context/ctxhttp"
	log15 "gopkg.in/inconshreveable/log15.v2"
)

type qitem struct {
	repo api.RepoName
	rev  string
}

type workQueue struct {
	enqueue chan<- qitem          // channel of inputs
	dequeue chan<- (chan<- qitem) // channel of task executors
}

func NewQueue(lengthGauge prometheus.Gauge) workQueue {
	enqueue, dequeue := queueWithoutDuplicates(lengthGauge)
	return workQueue{enqueue: enqueue, dequeue: dequeue}
}

// Enqueue adds an item to the queue and immediately returns.
func (w *workQueue) Enqueue(repo api.RepoName, rev string) {
	w.enqueue <- qitem{repo: repo, rev: rev}
}

// queueWithoutDuplicates provides a queue that ignores a new entry if it is already enqueued.  Consumers of the queue
// should push a `chan qitem` onto dequeue and then immediately read from the same channel. Sending to the dequeue
// channel blocks if the queue size is zero.
func queueWithoutDuplicates(lengthGauge prometheus.Gauge) (enqueue chan<- qitem, dequeue chan<- (chan<- qitem)) {
	var queue []qitem
	set := make(map[qitem]struct{})
	enqueueChan := make(chan qitem)
	dequeueChan := make(chan (chan<- qitem))

	go func() {
		for {
			if len(queue) == 0 {
				repoRev := <-enqueueChan
				queue = append(queue, repoRev)
				set[repoRev] = struct{}{}
				if lengthGauge != nil {
					lengthGauge.Set(float64(len(queue)))
				}
			}

			select {
			case repoRev := <-enqueueChan:
				if _, ok := set[repoRev]; ok {
					continue // duplicate, discard
				}
				queue = append(queue, repoRev)
				set[repoRev] = struct{}{}
				if lengthGauge != nil {
					lengthGauge.Set(float64(len(queue)))
				}
			case c := <-dequeueChan:
				repoRev := queue[0]
				queue = queue[1:]
				delete(set, repoRev)
				if lengthGauge != nil {
					lengthGauge.Set(float64(len(queue)))
				}
				c <- repoRev
			}
		}
	}()

	return enqueueChan, dequeueChan
}

// SecondaryQueue returns a read-only channel from which the reader receives a stream of repositories to index in the
// background. This stream is generated by querying for the first 100 repositories with no indexed_revision, ordered by
// creation date.
func SecondaryQueue(ctx context.Context) <-chan qitem {
	c := make(chan qitem)
	go func() {
		for {
			resp, err := listRepos(ctx)
			if err != nil {
				log15.Error("Could not list unindexed repositories", "err", err)
				time.Sleep(5 * time.Second)
				continue
			}

			for _, rp := range resp.Data.Repositories.Nodes {
				c <- qitem{repo: api.RepoName(rp.Name)}
			}

			time.Sleep(30 * time.Second)
		}
	}()
	return c
}

const gqlSearchQuery = `query {
  repositories(orderBy:REPOSITORY_CREATED_AT, notCIIndexed:true, descending:true, first:1000) {
    nodes {
      name
    }
  }
}
`

type gqlSearchResponse struct {
	Data struct {
		Repositories struct {
			Nodes []struct {
				Name string
			}
		}
	}
	Errors []interface{}
}

func listRepos(ctx context.Context) (*gqlSearchResponse, error) {
	var buf bytes.Buffer
	err := json.NewEncoder(&buf).Encode(graphQLQuery{
		Query: gqlSearchQuery,
	})
	if err != nil {
		return nil, errors.Wrap(err, "Encode")
	}

	url, err := gqlURL("ListRepos")
	if err != nil {
		return nil, errors.Wrap(err, "constructing frontend URL")
	}

	resp, err := ctxhttp.Post(ctx, nil, url, "application/json", &buf)
	if err != nil {
		return nil, errors.Wrap(err, "Post")
	}
	defer resp.Body.Close()

	var res *gqlSearchResponse
	if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
		return nil, errors.Wrap(err, "Decode")
	}
	if len(res.Errors) > 0 {
		return res, fmt.Errorf("graphql: errors: %v", res.Errors)
	}
	return res, nil
}
